package com.ml4ai.core.stack.mq;

import com.rabbitmq.client.*;
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.function.Function;

/**
 * Created by leecheng on 2018/11/14.
 */
@Builder
@Data
public class RabbitMQAgent implements Closeable {

    private String host;

    private Integer port;

    private String vHost;

    private String user;

    private String password;

    private ConnectionFactory connectionFactory;

    private Connection connection;

    private Channel commandChannel;

    @SneakyThrows
    public void init() {
        if (connectionFactory == null) {
            connectionFactory = new ConnectionFactory();
            connectionFactory.setVirtualHost(vHost);
            connectionFactory.setHost(host);
            connectionFactory.setPort(port);
            connectionFactory.setUsername(user);
            connectionFactory.setPassword(password);
        }
        if (!(connection != null && connection.isOpen())) {
            connection = connectionFactory.newConnection();
        }
    }

    @SneakyThrows
    public Channel getWorkChannel() {
        if (connection == null || !connection.isOpen()) {
            init();
        }
        return connection.createChannel();
    }

    @SneakyThrows
    public Channel getCommandChannel() {
        if (commandChannel == null || !commandChannel.isOpen() || connection == null || !connection.isOpen()) {
            init();
            commandChannel = connection.createChannel();
        }
        return commandChannel;
    }

    @SneakyThrows
    public void declareExchange(String exchange, BuiltinExchangeType exchangeType, boolean persistent) {
        getCommandChannel().exchangeDeclare(exchange, exchangeType, persistent);
    }

    @SneakyThrows
    public void declareQueue(String queue, boolean persistent, boolean exclusive, boolean autoDelete, Map<String, Object> parameter) {
        getCommandChannel().queueDeclare(queue, persistent, exclusive, autoDelete, parameter);
    }

    @SneakyThrows
    public void bindQueue(String queue, String exchange, String routeKey, Map<String, Object> parameter) {
        getCommandChannel().queueBind(queue, exchange, routeKey, parameter);
    }

    @SneakyThrows
    public void bindExchange(String target, String root, String rootKey, Map<String, Object> parameter) {
        getCommandChannel().exchangeBind(target, root, rootKey, parameter);
    }

    @SneakyThrows
    public void deleteQueue(String queue) {
        getCommandChannel().queueDelete(queue);
    }

    @SneakyThrows
    public void deleteExchange(String... a) {
        for (String e : a) {
            getCommandChannel().exchangeDelete(e);
        }
    }

    @SneakyThrows
    public void produceText(String exchange, String routeKey, String message, boolean persistent) {
        Channel channel = getWorkChannel();
        channel.basicPublish(exchange, routeKey, persistent ? MessageProperties.PERSISTENT_TEXT_PLAIN : MessageProperties.TEXT_PLAIN, message.getBytes("UTF-8"));
        channel.abort();
    }

    @SneakyThrows
    public void produce(String exchange, String routeKey, byte[] message, boolean persistent) {
        Channel channel = getWorkChannel();
        channel.basicPublish(exchange, routeKey, persistent ? MessageProperties.PERSISTENT_BASIC : MessageProperties.BASIC, message);
        channel.abort();
    }

    @SneakyThrows
    public void consumeText(String queue, int count, Function<String, Boolean> consumerOperation) {
        Channel channel = getWorkChannel();
        channel.basicQos(count);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                boolean ack = consumerOperation.apply(new String(body, "UTF-8"));
                if (ack) {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } else {
                    channel.basicReject(envelope.getDeliveryTag(), true);
                }
            }
        };
        channel.basicConsume(queue, false, consumer);
    }


    @SneakyThrows
    public void consume(String queue, int count, Function<byte[], Boolean> consumerOperation) {
        Channel channel = getWorkChannel();
        channel.basicQos(count);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                boolean ack = consumerOperation.apply(body);
                if (ack) {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } else {
                    channel.basicReject(envelope.getDeliveryTag(), true);
                }
            }
        };
        channel.basicConsume(queue, false, consumer);
    }

    @SneakyThrows
    public void close() {
        if (connection != null && connection.isOpen()) {
            connection.close();
        }
    }

}
